// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package plan

import (
	"errors"
	"io"
	"sync"
	"sync/atomic"

	"github.com/dolthub/go-mysql-server/sql"
)

// CachedResultsGlobalCache manages the caches created by CachedResults nodes.
var CachedResultsGlobalCache = NewCachedResultsManager()

var ErrEmptyCachedResult = errors.New("CachedResult contains no rows")
var ErrRowIterDisposed = errors.New("attempted to call RowIter() on a disposed Node")

// NewCachedResults returns a cached results plan Node, which will use a
// RowCache to cache results generated by Child.RowIter() and return those
// results for future calls to RowIter. This node is only safe to use if the
// Child is deterministic and is not dependent on the |row| parameter in the
// call to RowIter.
func NewCachedResults(n sql.Node) *CachedResults {
	return &CachedResults{
		UnaryNode: UnaryNode{n},
		Id:        CachedResultsGlobalCache.allocateUniqueId(),
	}
}

// CachedResults tees the child node iterator into an in-memory cache
// for faster subsequent retrieval. This is usually combined with a
// HashLookup, whose RowIter defers to a CachedResult child to populate
// rows in memory on a first iteration. The second RowIter moves the
// rows from the memory cache to a hash map attached to HashLookup,
// disposing the CachedResult afterwards.
//
// In the special case where we fill a CachedResult on pass one, but pass
// two never happens, we have to take care not to orphan the cache.
//
// When we exhaust the source, but the cache is empty, subsequent calls
// to RowIter return an ErrEmptyCachedResult error for short-circuiting
// join trees.
//
// When the memory manager cannot accommodate expanding the cache, we
// fall back to a passthrough iterator.
type CachedResults struct {
	UnaryNode
	Id    uint64
	Mutex sync.Mutex
	//NoCache is set when the memory manager is unable to build
	// a cache, so we fallback to a passthrough RowIter
	NoCache bool
	// Finalized is set when we exhaust the child iter, and subsequent
	// RowIters will read from the cache rather than the child
	Finalized bool
	// Disposed is set after this CachedResults is invalidated
	Disposed bool
}

var _ sql.Node = (*CachedResults)(nil)
var _ sql.CollationCoercible = (*CachedResults)(nil)

func (n *CachedResults) Dispose() {
	n.Disposed = true
	CachedResultsGlobalCache.disposeCachedResultsById(n.Id)
}

func (n *CachedResults) String() string {
	pr := sql.NewTreePrinter()
	_ = pr.WriteNode("CachedResults")
	_ = pr.WriteChildren(n.UnaryNode.Child.String())
	return pr.String()
}

func (n *CachedResults) DebugString() string {
	pr := sql.NewTreePrinter()
	_ = pr.WriteNode("CachedResults")
	_ = pr.WriteChildren(sql.DebugString(n.UnaryNode.Child))
	return pr.String()
}

func (n *CachedResults) WithChildren(children ...sql.Node) (sql.Node, error) {
	if len(children) != 1 {
		return nil, sql.ErrInvalidChildrenNumber.New(n, len(children), 1)
	}
	nn := *n
	nn.UnaryNode.Child = children[0]
	return &nn, nil
}

// CollationCoercibility implements the interface sql.CollationCoercible.
func (n *CachedResults) CollationCoercibility(ctx *sql.Context) (collation sql.CollationID, coercibility byte) {
	return sql.GetCoercibility(ctx, n.Child)
}

func (n *CachedResults) GetCachedResults() []sql.Row {
	return CachedResultsGlobalCache.getCachedResultsById(n.Id)
}

func (n *CachedResults) IsReadOnly() bool {
	return n.Child.IsReadOnly()
}

var EmptyIter = &emptyCacheIter{}

func IsEmptyIter(i sql.RowIter) bool {
	return i == EmptyIter
}

type emptyCacheIter struct{}

var _ sql.RowIter = (*emptyCacheIter)(nil)

func (i *emptyCacheIter) Next(ctx *sql.Context) (sql.Row, error) { return nil, io.EOF }

func (i *emptyCacheIter) Close(ctx *sql.Context) error { return nil }

// cachedResultsManager manages the saved results collected by CachedResults nodes. It is necessary to do this outside
// of the CachedResult node instances themselves, since executing a query plan can make transient transforms that are
// not persisted back and can cause cache memory leaks.
type cachedResultsManager struct {
	// cachedResultsCaches tracks caches used by CachedResults globally so that even if a CachedResult
	// object is copied as part of a transform, its cache can be properly disposed. This is necessary because
	// when evaluating subquery expressions, the query plan is transformed into a new copy that has prependRow nodes
	// in it, but that modified version of the query plan is transient, so the caches need to be held somewhere
	// where they can be properly disposed after the lifetime of an individual subquery expression.
	cachedResultsCaches map[uint64]*cacheDisposeTuple

	// mutex protects the cachedResultsCaches map from concurrent access.
	mutex sync.Mutex

	// cachedResultsUniqueIdCounter stores a counter that should only be incremented atomically and is used
	// as a unique ID when a new CachedResults object is created for a node.
	cachedResultsUniqueIdCounter uint64
}

// cacheDisposeTuple is a container for a cache and the related function to dispose it.
type cacheDisposeTuple struct {
	cache   sql.RowsCache
	dispose sql.DisposeFunc
}

func NewCachedResultsManager() *cachedResultsManager {
	return &cachedResultsManager{
		cachedResultsCaches: make(map[uint64]*cacheDisposeTuple),
	}
}

func (crm *cachedResultsManager) allocateUniqueId() uint64 {
	return atomic.AddUint64(&(crm.cachedResultsUniqueIdCounter), 1)
}

func (crm *cachedResultsManager) getCachedResultsById(id uint64) []sql.Row {
	crm.mutex.Lock()
	defer crm.mutex.Unlock()

	if results, ok := crm.cachedResultsCaches[id]; ok {
		return results.cache.Get()
	} else {
		return nil
	}
}

func (crm *cachedResultsManager) AddNewCache(id uint64, cache sql.RowsCache, dispose sql.DisposeFunc) bool {
	crm.mutex.Lock()
	defer crm.mutex.Unlock()

	if _, ok := crm.cachedResultsCaches[id]; ok {
		return false
	}

	crm.cachedResultsCaches[id] = &cacheDisposeTuple{cache: cache, dispose: dispose}
	return true
}

func (crm *cachedResultsManager) disposeCachedResultsById(id uint64) {
	crm.mutex.Lock()
	defer crm.mutex.Unlock()

	if results, ok := crm.cachedResultsCaches[id]; ok {
		results.dispose()
		delete(crm.cachedResultsCaches, id)
	}
}
